package com.huan.flink.filter;

import com.huan.flink.dto.Animal;
import com.huan.flink.dto.Chicken;
import com.huan.flink.dto.Duck;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Objects;

/**
 * 需求： 根据数据的type进行判断，转换成不同的对象，分别做处理
 * 实现： 此处是通过filter来实现的，但是这样虽然实现了功能，但不是最好的，因为 同一份数据，需要经过多次过滤处理，浪费资源。假设在filter()方法
 * 中需要调用数据库获取数据，然后进行判断，那么在这个需求中，同一条数据，需要查询3次数据库，因为有3个filter方法
 *
 * @author huan.fu
 * @date 2023/9/23 - 10:01
 */
public class FilterStreamApplication {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<Animal> ds = environment.fromElements(
                        new Animal("1", "鸭子"),
                        new Animal("1", "小鸭子"),
                        new Animal("2", "三黄鸡"),
                        new Animal("2", "大公鸡"),
                        new Animal("3", "其他动物")
                )
                .map(a -> a);

        // 过滤出鸭子进行操作、输出
        ds.filter(animal -> Objects.equals("1", animal.getType()))
                .map(animal -> new Duck(animal.getType(), animal.getName()))
                .print();
        // 过滤出鸡进行操作、输出
        ds.filter(animal -> Objects.equals("2", animal.getType()))
                .map(animal -> new Chicken(animal.getType(), animal.getName()))
                .print();
        // 过滤出其他进行操作、输出
        ds.filter(animal -> Objects.equals("3", animal.getType()))
                .print();


        environment.execute("分流");
    }
}
